Workflowsで使えるBigQueryのqueryコネクタを用いてBigQueryにクエリ発行する時はtimeoutMsフィールドに注意
概要
WorkflowsのBigQueryコネクタ(query)を用いるとWorkflowsからBigQueryにSQLを実行することができます。
Method: googleapis.bigquery.v2.jobs.query
とっても便利なqueryコネクタですが、注意が必要な点があったので記事にしてみました。
注意点
queryコネクタのリファレンスには以下の通り概要が記載されています。
Runs a BigQuery SQL query synchronously and returns query results if the query completes within a specified timeout.
BigQueryのSQLクエリを同期的に実行し、指定したタイムアウト内にクエリが完了した場合にクエリ結果を返します。
つまりこのコネクタは同期的に実行されるので、SELECT文などで結果を取得してWorkflowsで扱う場合にポーリングをして待つ必要がないものとなります。
そして重要なのは指定したタイムアウト
という箇所です。
queryコネクタは実行時の引数でタイムアウト時間を設定することができます。
タイムアウト時間を設定する引数はtimeoutMs
というものですが、この指定を省略すると10秒でリクエストがタイムアウトします。
引数 | 説明 |
---|---|
timeoutMs | [Optional] How long to wait for the query to complete, in milliseconds, before the request times out and returns. Note that this is only a timeout for the request, not the query. If the query takes longer to run than the timeout value, the call returns without any results and with the 'jobComplete' flag set to false. You can call GetQueryResults() to wait for the query to complete and read the results. The default value is 10000 milliseconds (10 seconds). [オプション] リクエストがタイムアウトしてリターンするまでの、クエリが完了するまでの待ち時間をミリ秒単位で指定します。 これはリクエストのタイムアウトであり、クエリのタイムアウトではないことに注意してください。 クエリの実行にタイムアウト値よりも長い時間がかかった場合、コール結果は返されず、 'jobComplete' フラグが false に設定されます。 GetQueryResults() を呼び出して、クエリの完了を待ち、結果を読み取ることができます。 既定値は 10000 ミリ秒 (10 秒) です(DeepL翻訳)。 |
タイムアウトといってもクエリが終了されるわけではありません。WorkflowsがBigQueryへクエリ実行した後にBigQueryからの結果返却までの時間のタイムアウトとなります。
イメージとしては以下となります(あくまでイメージ図です)
なのでtimeoutMs
を省略したqueryコネクタを用いた実装を行った場合は、10秒以上かかるクエリの場合はBigQuery側ではSQLは実行されているが、Workflowsへ結果が返却されないということになります。この場合、実行結果をjobidでポーリングして待機するか、またはgetQueryResults
コネクタを用いて結果取得する必要があります。
実際に試してみる
準備
前提として10秒以上かかるクエリを用意する必要があります(これが一番大変でした)。
外部テーブルを用意して、Workflowsからクエリ発行します。3GBのJSONLinesのデータを作成して、COUNTを取るというクエリで大体15秒程度でした。
一応以下にダミーデータ作成のスクリプト、DDL、SQLを示します。本筋ではないので解説はしません。
- スクリプト
import json
import random
import string
import os
# 3GBの目標サイズ(バイト単位)
TARGET_SIZE = 3 * 1024 * 1024 * 1024
# 出力ファイル名
OUTPUT_FILE = "large_file.jsonl"
def random_string(length=10):
"""ランダムな文字列を生成する"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def random_record():
"""ランダムなレコードを生成する"""
return {
"id": random.randint(1, 1000000),
"name": random_string(10),
"email": f"{random_string(5)}@example.com",
"age": random.randint(18, 99),
"address": {
"city": random_string(8),
"zipcode": random.randint(10000, 99999),
},
"is_active": random.choice([True, False]),
"tags": [random_string(5) for _ in range(random.randint(1, 5))]
}
def create_large_jsonlines_file():
"""JSONLinesファイルを作成する"""
with open(OUTPUT_FILE, "w") as f:
current_size = 0
while current_size < TARGET_SIZE:
record = random_record()
json_record = json.dumps(record)
f.write(json_record + "\n") # 各レコードを1行に書き込む
current_size += len(json_record.encode("utf-8")) + 1 # +1は改行文字の分
print(f"JSONLinesファイル '{OUTPUT_FILE}' が作成されました。サイズ: {os.path.getsize(OUTPUT_FILE) / (1024 * 1024):.2f} MB")
if __name__ == "__main__":
create_large_jsonlines_file()
- DDL
CREATE OR REPLACE EXTERNAL TABLE tmp_nemoto.test(
id INT64,
name STRING,
email STRING,
age INT64,
address STRUCT<
city STRING,
zipcode INT64
>,
is_active BOOL,
tags ARRAY<STRING>
)
OPTIONS (
format = 'json',
uris = ['gs://バケット名/bq_data/large_file*.jsonl']);
- SQL
SELECT
COUNT(*) AS table_count
FROM データセット名.test
Workflowsを実行してみる
実行するSQLファイルをCloud Storageバケットに保存しておいて、それを読み取ってSQLを実行するワークフローを用意しました。
以下はワークフローのソース全文です。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- sqlBucket: "バケット名"
- sqlFilePath: "SQLファイル名"
- readSqlFiles:
call: googleapis.storage.v1.objects.get
args:
bucket: ${sqlBucket}
object: ${text.url_encode(sqlFilePath)}
alt: "media"
result: sqlFileContent
- executeQuery:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
query: ${text.decode(sqlFileContent)}
useLegacySql: false
useQueryCache: false
#timeoutMs: 20000 まずはコメントアウト
result: queryResult
- executeQueryLog:
call: sys.log
args:
text: ${queryResult}
severity: "INFO"
- returnStep:
return: ${queryResult}
まずtimeoutMs
を設定しない状態で実行します。キャッシュされてしまうと意図したタイムアウトしなくなってしまうのでuseQueryCache
をfalse(キャッシュしない)に設定しています。
実行すると以下のレスポンスが出力されます。
{
"jobComplete": false,
"jobCreationReason": {
"code": "REQUESTED"
},
"jobReference": {
"jobId": "job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7",
"location": "asia-northeast1",
"projectId": "プロジェクトID"
},
"kind": "bigquery#queryResponse",
"queryId": "job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7"
}
jobComplete
がfalse
になっているのでジョブ実行中にタイムアウトとなっていることがわかります。
BigQueryのジョブID(queryId)が出ているのでINFORMATION_SCHEMA.JOBS Viewから本当に10秒以上実行時間がかかっていたかみてみます。
SELECT
job_id,
state,
TIMESTAMP_DIFF(end_time, start_time, SECOND) AS execution_time_seconds
FROM
`region-asia-northeast1`.`INFORMATION_SCHEMA.JOBS`
WHERE job_id = 'job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7';
11秒かかっていることが確認できました。
では、今度はtimeoutMs
を20000ミリ秒(=20秒)に設定して実行してみます。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- sqlBucket: "バケット名"
- sqlFilePath: "SQLファイル名"
- readSqlFiles:
call: googleapis.storage.v1.objects.get
args:
bucket: ${sqlBucket}
object: ${text.url_encode(sqlFilePath)}
alt: "media"
result: sqlFileContent
- executeQuery:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
query: ${text.decode(sqlFileContent)}
useLegacySql: false
useQueryCache: false
timeoutMs: 20000 #コメントアウト解除
result: queryResult
- executeQueryLog:
call: sys.log
args:
text: ${queryResult}
severity: "INFO"
- returnStep:
return: ${queryResult}
実行結果を見てみます。
実行時間は11秒ちょいで、SQLの実行は完了してクエリ結果も返ってきているように見えますね!
こちらもJSONをみてみます。
{
"cacheHit": false,
"jobComplete": true,
"jobCreationReason": {
"code": "REQUESTED"
},
"jobReference": {
"jobId": "job_kXqnrVt-7mxMmB0sde_ausdLsbnJ",
"location": "asia-northeast1",
"projectId": "プロジェクトID"
},
"kind": "bigquery#queryResponse",
"queryId": "job_kXqnrVt-7mxMmB0sde_ausdLsbnJ",
"rows": [
{
"f": [
{
"v": "17376189"
}
]
}
],
"schema": {
"fields": [
{
"mode": "NULLABLE",
"name": "table_count",
"type": "INTEGER"
}
]
},
"totalBytesProcessed": "3221225601",
"totalRows": "1"
}
jobComplete
がtrue
になっていますね。結果も期待した値(17376189)が返ってきています。
※テーブルのデータはランダムで作成しているので作成タイミングによって期待する値は異なります。
上記より、timeoutMs
を設定することで10秒以上かかるクエリでもWorkflowsで結果取得することができました。
おまけ
timeoutMs
に設定できる上限値は記載がありませんでしたが、BigQueryのSQLの実行時間上限の6時間(=21,600,000ミリ秒
)は設定することができました。
まとめ
queryコネクタを用いてBigQueryにSQLを実行して、Workflows側に取得した結果やSQLの実行状態(失敗したかどうか)を検知したい場合はtimeoutMs
を設定するようにしましょう。
逆に、そういったことが不要な場合はtimeoutMs
を設定せずに(デフォルト10秒が設定されますが)実装してしまっても良いのかなと思います。
意外と見落とすことがあるかもしれない引っかかりポイントかもと思ったので記事にしてみました。
それではまた、ナマステー